Conversation
82ada96 to
ff6c292
Compare
2053566 to
05c1625
Compare
wgtmac
left a comment
There was a problem hiding this comment.
I've carefully reviewed the retry mechanism and found a few parity issues and a structural data-loss concern regarding how pending updates are held during retries. Please see the inline comments.
|
I just recall a design flaw in the interaction between PendingUpdate and Transaction and created a fix: #591. Without this fix, users have to cache all created pending update instances, otherwise they cannot retry them since they are weak_ptr in the transaction instance. |
|
@linguoxuan Could you please rebase to resolve the conflict? |
296d9ed to
79c0218
Compare
wgtmac
left a comment
There was a problem hiding this comment.
Thanks for the update and fixing the CI! I've just reviewed it for a quick pass.
| ICEBERG_ASSIGN_OR_RAISE(auto refreshed_table, catalog_->LoadTable(identifier_)); | ||
| if (metadata_location_ != refreshed_table->metadata_file_location()) { | ||
| metadata_ = std::move(refreshed_table->metadata_); | ||
| metadata_location_ = std::string(refreshed_table->metadata_file_location()); |
There was a problem hiding this comment.
This looks like a bug that can be fixed separately and merged quickly
| } | ||
|
|
||
| Result<std::shared_ptr<Table>> Transaction::CommitOnce() { | ||
| auto refresh_result = ctx_->table->Refresh(); |
There was a problem hiding this comment.
It seems unnecessary to issue a Refresh() call on the first attempt since it may prohibit fast commit. Can we change to only call Refresh during retry?
|
|
||
| Result<std::shared_ptr<Table>> Transaction::CommitOnce() { | ||
| auto refresh_result = ctx_->table->Refresh(); | ||
| if (!refresh_result.has_value()) { |
There was a problem hiding this comment.
Let's reuse macros like ICEBERG_RETURN_UNEXPECTED and ICEBERG_ASSIGN_OR_RAISE to write less lines.
|
|
||
| } break; | ||
| Result<std::shared_ptr<Table>> commit_result; | ||
| if (!CanRetry()) { |
There was a problem hiding this comment.
The non-retry path and CommitOnce() both build requirements and call UpdateTable, but through different code paths. Consider unifying: even the non-retry case could go through CommitOnce() with num_retries=0, which would eliminate the branching and reduce maintenance burden.
| ctx_->metadata_builder = | ||
| TableMetadataBuilder::BuildFrom(ctx_->table->metadata().get()); | ||
| for (const auto& update : pending_updates_) { | ||
| auto commit_status = update->Commit(); |
There was a problem hiding this comment.
Should we directly call this->Apply(*update) to avoid an indirection?
| int64_t SnapshotUpdate::SnapshotId() { | ||
| if (!snapshot_id_.has_value()) { | ||
| while (!snapshot_id_.has_value() || | ||
| base().SnapshotById(snapshot_id_.value()).has_value()) { |
There was a problem hiding this comment.
Doesn't SnapshotUtil::GenerateSnapshotId below have the same loop?
|
|
||
| #include <algorithm> | ||
| #include <chrono> | ||
| #include <functional> |
| } | ||
|
|
||
| /// \brief Specify error types that should stop retries immediately | ||
| RetryRunner& StopRetryOn(std::initializer_list<ErrorKind> error_kinds) { |
There was a problem hiding this comment.
It would be good to document the priority between OnlyRetryOn and StopRetryOn.
| virtual Kind kind() const = 0; | ||
|
|
||
| /// \brief Whether this update can be retried after a commit conflict. | ||
| virtual bool IsRetryable() const { return true; } |
There was a problem hiding this comment.
I think the Java impl has other types of updates that disallow retry, e.g. UpdateSchema. We might need to evaluate if we want to keep the same behavior.
| // -------------------------------------------------------------------------- | ||
| // Test: Successful on first attempt — no retries | ||
| // -------------------------------------------------------------------------- | ||
| TEST(RetryRunnerTest, SuccessOnFirstAttempt) { |
There was a problem hiding this comment.
We only have test on RetryRunner. Can we add some integration test directly using Table or Transaction?
| Result<T> Run(F&& task, int32_t* attempt_counter = nullptr) { | ||
| auto start_time = std::chrono::steady_clock::now(); | ||
| int32_t attempt = 0; | ||
| int32_t max_attempts = config_.num_retries + 1; |
There was a problem hiding this comment.
Do we need to validate config_.num_retries?
|
|
||
| /// \brief Run a task that returns a Result<T> | ||
| template <typename F, typename T = typename std::invoke_result_t<F>::value_type> | ||
| Result<T> Run(F&& task, int32_t* attempt_counter = nullptr) { |
There was a problem hiding this comment.
Instead of a raw pointer attempt_counter, is it better to incorporate metrics reporter that is being added by @evindj?
| } | ||
|
|
||
| /// \brief Sleep for the specified duration | ||
| void Sleep(int32_t ms) const { |
This commit implements the retry for transaction commits. It introduces a generic RetryRunner utility with exponential backoff and error-kind filtering, and integrates it into Transaction::Commit() to automatically refresh table metadata and retry on commit conflicts.